package com.cloudinnov.task.mq;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;

import com.alibaba.fastjson.JSON;
import com.cloudinnov.dao.BoardSolutionConfigDao;
import com.cloudinnov.dao.BoardTemplateDao;
import com.cloudinnov.dao.EquipmentsAttrDao;
import com.cloudinnov.dao.EquipmentsDao;
import com.cloudinnov.logic.AccidentWorkOrderLogic;
import com.cloudinnov.logic.AlarmWorkOrdersLogic;
import com.cloudinnov.logic.ControlSolutionLogic;
import com.cloudinnov.logic.FaultsLogic;
import com.cloudinnov.logic.TriggerLogic;
import com.cloudinnov.logic.WorkOrderLogic;
import com.cloudinnov.model.AccidentWorkOrder;
import com.cloudinnov.model.AlarmWorkOrders;
import com.cloudinnov.model.BoardSolutionConfig;
import com.cloudinnov.model.BoardTemplate;
import com.cloudinnov.model.ControlSolution;
import com.cloudinnov.model.Equipments;
import com.cloudinnov.model.EquipmentsAttr;
import com.cloudinnov.model.Faults;
import com.cloudinnov.model.FireCRTEvent;
import com.cloudinnov.model.Trigger;
import com.cloudinnov.model.WorkOrder;
import com.cloudinnov.utils.CommonUtils;
import com.cloudinnov.utils.JudgeNullUtil;
import com.cloudinnov.websocket.BoardControlSolutionDialogueWebsocket;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * 故障处理服务。监听kafka发过来的故障码，生成工单入库
 * @ClassName: FaultKafkaConsumer
 * @Description: TODO
 * @author: ningmeng
 * @date: 2016年12月8日 下午2:09:47
 */
public class FaultMQConsumer implements ChannelAwareMessageListener {
	static final Logger LOG = LoggerFactory.getLogger(FaultMQConsumer.class);
	static final String DEFAULT_CHARSET = "UTF-8";
	private static final String SECTION_CODE = "default";
	private static final String PADDING_BY = "PaddingBy";
	private static final String SPLITTER_LEVEL1 = ",";
	private static final String FAULT_CODE = "faultCode:";
	private static final String FAULT_RECORD = ":record";
	private static final String FAULT_COUNT = "faultCount";
	static final long DELIVERIED_TAG = -1;
	@Autowired
	private JedisPool jedisPool;
	@Autowired
	private AlarmWorkOrdersLogic awoLogic;
	@Autowired
	private TriggerLogic triggerLogic;
	@Autowired
	private FaultsLogic faultsLogic;
	@Autowired
	private AlarmWorkOrdersLogic alarmWorkOrdersLogic;
	@Autowired
	private WorkOrderLogic workOrderLogic;
	@Autowired
	private AccidentWorkOrderLogic accidentWorkOrderLogic;
	@Autowired
	private ControlSolutionLogic ControlSolutionLogic;
	@Autowired
	private EquipmentsAttrDao equipmentsAttrDao;
	@Autowired
	private BoardSolutionConfigDao boardSolutionConfigDao;
	@Autowired
	private EquipmentsDao equipmentsDao;
	@Autowired
	private BoardTemplateDao boardTemplateDao;

	@Override
	public void onMessage(Message message, Channel channel) throws IOException {
		LOG.debug("entering kafka OnMessage, data :{}", new String(message.getBody()));
		String receiveMsg = null;
		try {
			receiveMsg = new String(message.getBody(), DEFAULT_CHARSET);
		} catch (UnsupportedEncodingException e1) {
			LOG.error("Subscribe fault data is error, data: {}, error: {}", message, e1);
		}
		String channelCode, faultCode;
		String[] faultMsgArray = receiveMsg.trim().split(SPLITTER_LEVEL1);
		if (JudgeNullUtil.iAryRegStr(faultMsgArray)) {
			Jedis redisFault = null;
			try {
				if (faultMsgArray.length == 1) {
					redisFault = jedisPool.getResource();
					redisFault.select(CommonUtils.REDIS_FAULT_VALUEDB);
					faultCode = faultMsgArray[0];
					LOG.debug("Consumer data:faultCode:[" + faultCode + "]");
					if (CommonUtils.isNotEmpty(faultCode)) {
						String faultCount = redisFault.hget(FAULT_CODE + faultCode + FAULT_RECORD, FAULT_COUNT);
						if (faultCount == null) {
							redisFault.hset(FAULT_CODE + faultCode + FAULT_RECORD, FAULT_COUNT, "1");
							Faults fault = new Faults();
							fault.setFaultCode(faultCode);
							List<Faults> faults = faultsLogic.selectListByFaultCode(fault);
							if (JudgeNullUtil.iList(faults)) {
								faults.get(0).setHappenTime(System.currentTimeMillis());
								alarmWorkOrdersLogic.sendMesssageToDialogueWebSocket(JSON.toJSONString(faults.get(0)));
								WorkOrder workOrder = new WorkOrder();
								workOrder.setSectionCode(SECTION_CODE);
								workOrder.setTitle(faults.get(0).getFaultName());
								workOrder.setContent(faults.get(0).getDescription());
								workOrder.setType(WorkOrder.ACCIDENT_WORK_ORDER);
								workOrder.setLevel(faults.get(0).getLevel());
								workOrder.setOrderStatus(CommonUtils.ORDER_STATUS_NEW);
								workOrder.setPaddingBy(PADDING_BY);
								workOrderLogic.save(workOrder);
								AccidentWorkOrder accWordOrder = new AccidentWorkOrder();
								accWordOrder.setFaultCode(faultCode);
								accWordOrder.setDescription(faults.get(0).getDescription());
								accWordOrder.setHandingSuggestion(faults.get(0).getHandlingSuggestion());
								accWordOrder.setFirstTime(System.currentTimeMillis() + "");
								accidentWorkOrderLogic.save(accWordOrder);
								if (JudgeNullUtil.iList(faults)) {
									triggerLogic.controlBoardSolutionByFaultcode(faultCode);
								}
							}
						} else {
							redisFault.hincrBy(FAULT_CODE + faultCode + FAULT_RECORD, FAULT_COUNT, 1);
						}
					}
				} else {
					channelCode = faultMsgArray[0];
					faultCode = faultMsgArray[1];
					LOG.debug("Consumer data:channelCode:[" + channelCode + "],faultCode:[" + faultCode + "]");
					List<BoardSolutionConfig> data = new ArrayList<>();
					if (CommonUtils.isNotEmpty(channelCode) && CommonUtils.isNotEmpty(faultCode)) {
						List<AlarmWorkOrders> awoInfo = awoLogic.selectFaultInfoByChannelAndFaultCode(channelCode,
								faultCode);
						if (JudgeNullUtil.iList(awoInfo)) {
							Faults fault = new Faults();
							fault.setFaultCode(faultCode);
							List<Faults> faults = faultsLogic.selectListByFaultCode(fault);
							if (JudgeNullUtil.iList(faults)) {
								alarmWorkOrdersLogic.sendMesssageToDialogueWebSocket(JSON.toJSONString(faults.get(0)));
							}
							// triggerLogic.controlBoardSolutionByFaultcode(faultCode);
							int returnCode = awoLogic.save(awoInfo);
							if (returnCode == CommonUtils.DEFAULT_NUM) {
								int attemptCount = 0;
								while (attemptCount < 5) {
									awoLogic.save(awoInfo);// 尝试5次重新插库
									attemptCount++;
								}
							}
						}
						// 根据故障码查询触发器
						Map<String, Object> webSocketContent = new HashMap<String, Object>();
						FireCRTEvent fireEvent = new FireCRTEvent();
						fireEvent.setEvent(channelCode);
						fireEvent.setDeviceId(faultMsgArray[2]);
						fireEvent.setDevice(faultMsgArray[3]);
						fireEvent.setPosition(faultMsgArray[4]);
						fireEvent.setOccurrenceTime(Long.valueOf(faultMsgArray[5]));
						List<Trigger> tiggerInfo = triggerLogic.selectTriggerByFaultCode();
						String triggerCodes = null;
						for (Trigger trigger : tiggerInfo) {
							List<Faults> parseArray = JSON.parseArray(trigger.getCondition(), Faults.class);
							for (Faults faults : parseArray) {
								if (faultCode.equals(faults.getFaultCode())) {
									triggerCodes = trigger.getCode();
								}
							}
						}
						// 根据故障码查询情报板方案
						List<ControlSolution> list = null;
						if (triggerCodes != null) {
							list = ControlSolutionLogic.selectControlSolutionByFaultCode(triggerCodes);
							for (ControlSolution controlSolution : list) {
								BoardSolutionConfig model = new BoardSolutionConfig();
								model.setSolutionCode(controlSolution.getCode());
								List<BoardSolutionConfig> boardSolutionConfigs = boardSolutionConfigDao
										.selectListBySolutionCode(model);
								Equipments equModel = null;
								Set<String> codes = new HashSet<String>();
								List<BoardSolutionConfig> newBoardSolutionConfigs = new ArrayList<>();
								List<Map<String, Object>> equMaps = new ArrayList<Map<String, Object>>();
								for (BoardSolutionConfig item : boardSolutionConfigs) {
									if (!codes.contains(item.getEquipmentCode())) {
										codes.add(item.getEquipmentCode());
										equModel = new Equipments();
										equModel.setCode(item.getEquipmentCode());
										equModel.setLanguage(CommonUtils.LOGIN_LANGUAGE);
										equModel = equipmentsDao.selectEntityByCondition(equModel);
										if (equModel != null) {
											Map<String, Object> equMap = new HashMap<String, Object>();
											Map<String, Object> config = new HashMap<String, Object>();
											List<EquipmentsAttr> attrList = equipmentsAttrDao
													.selectListByEquipmentCode(item.getEquipmentCode());// 根据设备编码查询设备属性
											for (EquipmentsAttr equipmentsAttr : attrList) {
												if (CommonUtils.isNotEmpty(equipmentsAttr.getCustomTag())) {
													config.put(equipmentsAttr.getCustomTag(),
															equipmentsAttr.getValue());
												}
											}
											equMap.put("code", equModel.getCode());
											equMap.put("name", equModel.getName());
											equMap.put("configs", config);
											equMaps.add(equMap);
										}
									}
									if (!codes.contains(item.getBoardTemplateCode())) {
										item.setCode(item.getBoardTemplateCode());
										codes.add(item.getBoardTemplateCode());
										BoardTemplate boardTemplate = new BoardTemplate();
										boardTemplate.setLanguage(CommonUtils.LOGIN_LANGUAGE);
										boardTemplate.setCode(item.getBoardTemplateCode());
										boardTemplate = boardTemplateDao.selectEntityByCondition(boardTemplate);
										item.setName(boardTemplate.getName());
										item.setList(
												JSON.parseArray(item.getSendContent(), BoardSolutionConfig.Data.class));
										newBoardSolutionConfigs.add(item);
									}
									controlSolution.setEquipmentClassify(item.getEquipmentClassify());
								}
								controlSolution.setEquipments(equMaps);
								controlSolution.setBoardSolutionConfig(newBoardSolutionConfigs);
								List<ControlSolution> lists = new ArrayList<ControlSolution>();
								lists.add(controlSolution);
								webSocketContent.put("data", fireEvent);
								webSocketContent.put("list", lists);
								ObjectMapper mapper = new ObjectMapper();
								mapper.setSerializationInclusion(Include.NON_NULL);
								sendBoardMesssageToDialogueWebSocket(mapper.writeValueAsString(webSocketContent));
							}
							// List<EquipmentsAttr> attrList = equipmentsAttrDao
							// .selectListByEquipmentCode(controlSolution.getEquipmentCode());//
							// 根据设备编码查询设备属性
							// attr = new HashMap<String, Object>();
							// for (EquipmentsAttr equipmentsAttr : attrList) {
							// if (CommonUtils.isNotEmpty(equipmentsAttr.getCustomTag())) {
							// attr.put(equipmentsAttr.getCustomTag(), equipmentsAttr.getValue());
							// }
							// }
							// configs = new ArrayList<>();
							// if (CommonUtils.isNotEmpty(controlSolution.getConfig())) {
							// configs.addAll(JSON.parseArray(controlSolution.getConfig(),
							// BoardSolutionConfig.Data.class));
							// }
							// BoardSolutionConfig config = new BoardSolutionConfig();
							// fireEvent.setPosition(controlSolution.getName());
							// config.setEquipmentCode(controlSolution.getEquipmentCode());
							// config.setSolutionCode(controlSolution.getCode());
							// config.setEquipmentName(controlSolution.getEquipmentName());
							// config.setEquipmentCodes(attr);
							// if (JudgeNullUtil.iList(configs)) {
							// config.setList(configs);
							// data.add(config);
							// }
							// }
						}
					}
				}
			} catch (Exception e) {
				LOG.error("[method]:onMessage:\t" + e);
			} finally {
				long deliveryTag = message.getMessageProperties().getDeliveryTag();
				if (deliveryTag != DELIVERIED_TAG) {
					channel.basicAck(deliveryTag, false);
					message.getMessageProperties().setDeliveryTag(DELIVERIED_TAG);
					LOG.info("revice and ack msg: " + (receiveMsg == null ? message : receiveMsg));
				}
				jedisPool.returnResource(redisFault);
			}
		}
	}
	public int sendBoardMesssageToDialogueWebSocket(String content) {
		// 遍历所有连接客户工单WebSocket 推送对话
		Iterator<Map.Entry<String, BoardControlSolutionDialogueWebsocket>> boardControlSolutionDialogueWebsocket = BoardControlSolutionDialogueWebsocket.webSocketMap
				.entrySet().iterator();
		while (boardControlSolutionDialogueWebsocket.hasNext()) {
			Map.Entry<String, BoardControlSolutionDialogueWebsocket> boardControlSolutionDialogueEntry = boardControlSolutionDialogueWebsocket
					.next();
			try {
				boardControlSolutionDialogueEntry.getValue().sendMessage(content);
			} catch (IOException e) {
			}
		}
		return 0;
	}
}
